package com.starsky.common.rabbitmq.service;

import com.starsky.common.data.MQRstData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.UUID;

/**
 * 消息发送
 */
@Component
public class RabbitProduct implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
    private Logger log = LoggerFactory.getLogger(RabbitProduct.class);
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Autowired
    private RabbitMQService rabbitMQService;

    /* 重发限制次数 */
    private static final Integer RETRY_LIMIT = 3;

    /**
     * 启消息确认机制,即确认消息已发送到交换机(Exchange)，用户查看手动、自动消息确认过程
     * 发送失败数据，放入缓存，需要接收该数据的服务自行从缓存取出数据，保存到数据库即可
     *
     * @param correlationData
     * @param ack
     * @param cause
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        //todo 消息发送后，通过ack判断,执行业务操作
        System.out.println("ConfirmCallback:     " + "相关数据：" + correlationData);
        System.out.println("ConfirmCallback:     " + "确认情况：" + ack);
        System.out.println("ConfirmCallback:     " + "原因：" + cause);

        /* 消息唯一标识ID，第一次发送时生成，失败重发时重用 */
        String id = correlationData.getId();
        log.info(">>>>>>>>消息ID：" + id + "，发送结果：" + ack + "，原因：" + cause);
        /* 发送成功，删除消息缓存 */
        if (ack) {
            log.info(">>>>>>>>消息发送成功，ID：{}", id);
            //todo 执行业务操作
        }
    }

    /**
     * 消息发送成功确认
     *
     * @param message
     * @param replyCode
     * @param replyText
     * @param exchange
     * @param routingKey
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        System.out.println("ReturnCallback:     " + "消息：" + message);
        System.out.println("ReturnCallback:     " + "回应码：" + replyCode);
        System.out.println("ReturnCallback:     " + "回应信息：" + replyText);
        System.out.println("ReturnCallback:     " + "交换机：" + exchange);
        System.out.println("ReturnCallback:     " + "路由键：" + routingKey);
    }

    /**
     * @desc: 发送消息到mq
     * @author: wangsh
     * @time: 2020/11/14 14:35
     * @author: exchange：交换机
     * @author: routingKey：路由
     * @author: queue：队列名称
     * @author: mqType：数据类型（1-事件）
     * @author: msg: 发送内容，json字符串
     */
    public void sendMsgToMQ(String exchange, String routingKey, String queue, int mqType, String msg) {

        log.info(" >>>>>>>>>>>>>>>> 发送消息 start>>>>>>>>>>>>>>>> ");
        log.info(">>>>>>>>>>>>>>>>发送消息内容 : " + msg);
        log.info(">>>>>>>>>>>>>>>>exchange : " + exchange);
        log.info(">>>>>>>>>>>>>>>>routingKey : " + routingKey);
        log.info(">>>>>>>>>>>>>>>>queue : " + queue);
        String messageId = UUID.randomUUID().toString().replaceAll("-", "");
        CorrelationData correlation = new CorrelationData(messageId);

        try {
            //设置回调函数
            rabbitTemplate.setConfirmCallback(this);
            //动态绑定队列及key
//            rabbitMQService.createQueue(exchange, queue, routingKey);
            //组装消息体
            MQRstData mqRstData = MQRstData.getInstance();
            mqRstData.setType(mqType);
            mqRstData.setResult(msg);
            //发送消息
            rabbitTemplate.convertAndSend(exchange, routingKey, mqRstData, correlation);
        } catch (AmqpException e) {
            e.printStackTrace();
            log.error(">>>>>>>>>>>>>>>>发送消息失败 : {} ", e);
        }
        log.info(" >>>>>>>>>>>>>>>> 发送消息 End >>>>>>>>>>>>>>>> ");
    }

}
